package org.budo.warehouse.logic.bean;

import java.util.Map;

import javax.annotation.Resource;
import javax.sql.DataSource;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.budo.dubbo.protocol.async.config.BudoAsyncReferenceBean;
import org.budo.dubbo.protocol.async.config.BudoAsyncServiceBean;
import org.budo.dubbo.protocol.async.repository.activemq.ActiveMQAsyncRepository;
import org.budo.dubbo.protocol.async.repository.kafka.KafkaAsyncRepository;
import org.budo.support.javax.sql.util.JdbcUtil;
import org.budo.support.lang.util.MapUtil;
import org.budo.support.spring.bean.factory.support.BeanBuilder;
import org.budo.warehouse.logic.api.DataConsumer;
import org.budo.warehouse.logic.consumer.AbstractDataConsumer;
import org.budo.warehouse.logic.consumer.DataEntryPojoConsumer;
import org.budo.warehouse.logic.consumer.jdbc.JdbcDataConsumer;
import org.budo.warehouse.logic.consumer.jdbc.MysqlDataConsumer;
import org.budo.warehouse.logic.producer.WarehouseCanalMessageHandler;
import org.budo.warehouse.logic.util.PipelineUtil;
import org.budo.warehouse.service.entity.DataNode;
import org.budo.warehouse.service.entity.Pipeline;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * @author limingwei
 */
@Slf4j
@Component
public class DynamicBeanService {
    @Resource
    private ApplicationContext applicationContext;

    public DataConsumer dataConsumer(DataNode targetDataNode, Pipeline pipeline) {
        String url = targetDataNode.getUrl();

        if (url.startsWith("jdbc:")) {
            return this.jdbcDataConsumer(targetDataNode, pipeline);
        }

        if (url.startsWith("async:kafka://") || url.startsWith("async:activemq://")) { // kafka:192.168.4.253:9092 activemq:tcp://192.168.4.251:61616
            DataConsumer asyncDataConsumer = this.asyncDataConsumer(targetDataNode, pipeline);
            return new DataEntryPojoConsumer(asyncDataConsumer, pipeline);
        }

        if (url.startsWith("mail")) { // 邮件警告
            return this.mailDataConsumer(targetDataNode, pipeline);
        }

        throw new RuntimeException("#49 targetDataNode=" + targetDataNode);
    }

    public DataSource dataSource(DataNode dataNode) {
        String beanId = "DataSource-" + dataNode.getUrl() + "-" + dataNode.getUsername();

        if (!applicationContext.containsBean(beanId)) {
            new BeanBuilder() //
                    .id(beanId) //
                    .parent("abstractDruidDataSource") //
                    .propertyValue("url", dataNode.getUrl()) //
                    .propertyValue("username", dataNode.getUsername()) //
                    .propertyValue("password", dataNode.getPassword()) //
                    .registerTo(applicationContext);
        }

        return (DataSource) applicationContext.getBean(beanId);
    }

    private String canalMessageHandler(DataNode dataNode) {
        String canalMessageHandlerId = "canalMessageHandler-" + dataNode.getUrl();

        new BeanBuilder() //
                .id(canalMessageHandlerId) //
                .type(WarehouseCanalMessageHandler.class) //
                .property("dataNodeId", dataNode.getId() + "") //
                .registerTo(this.applicationContext);

        return canalMessageHandlerId;
    }

    public void canalDataProducer(DataNode dataNode) {
        String canalMessageHandlerId = this.canalMessageHandler(dataNode);

        // 作为 destination, 作为 zk path, 的规则限制
        String canalInstanceId = "canalDataProducer-" + dataNode.getUrl();
        canalInstanceId = canalInstanceId.replaceAll(":", "");
        canalInstanceId = canalInstanceId.replaceAll("\\.", "");
        canalInstanceId = canalInstanceId.replaceAll("//", "");

        Integer port = JdbcUtil.getPort(dataNode.getUrl());
        port = null == port ? 3306 : port;

        String password = dataNode.getPassword();
        password = null == password ? "" : password;

        new BeanBuilder() //
                .id(canalInstanceId) //
                .parent("abstractCanalInstance") //
                .property("host", JdbcUtil.getHost(dataNode.getUrl())) //
                .property("port", port + "") //
                .property("username", dataNode.getUsername()) //
                .property("password", password) //
                .ref("canalMessageHandler", canalMessageHandlerId) //
                .registerTo(applicationContext);
    }

    private DataConsumer jdbcDataConsumer(DataNode dataNode, Pipeline pipeline) {
        String beanId = "DataConsumer-" + dataNode.getUrl() + "-" + dataNode.getUsername() //
                + "-" + pipeline.getSourceTable() + "-" + pipeline.getTargetSchema() + "-" + pipeline.getTargetTable();

        if (!applicationContext.containsBean(beanId)) {
            Class<?> type = JdbcDataConsumer.class;
            if (dataNode.getUrl().startsWith("jdbc:mysql://")) {
                type = MysqlDataConsumer.class;
            }

            new BeanBuilder() //
                    .id(beanId) //
                    .type(type) //
                    .propertyValue("dataNode", dataNode) //
                    .propertyValue("pipeline", pipeline) //
                    .registerTo(applicationContext);
        }

        return (DataConsumer) applicationContext.getBean(beanId);
    }

    private DataConsumer mailDataConsumer(DataNode targetDataNode, Pipeline pipeline) {
        return new AbstractDataConsumer();
    }

    private String asyncRepository(DataNode dataNode) {
        if (dataNode.getUrl().startsWith("async:kafka://")) {
            return this.kafkaAsyncRepository(dataNode);
        } else if (dataNode.getUrl().startsWith("async:activemq://")) {
            return this.activemqAsyncRepository(dataNode);
        } else {
            throw new RuntimeException("#139 dataNode=" + dataNode);
        }
    }

    private String kafkaAsyncRepository(DataNode dataNode) {
        String beanId = "KafkaAsyncRepository-" + dataNode.getUrl();

        if (!applicationContext.containsBean(beanId)) {
            String kafkaUrl = dataNode.getUrl().substring(14);
            Map<String, Object> kafkaProperties = MapUtil.stringObjectMap("bootstrap.servers", kafkaUrl);

            new BeanBuilder() //
                    .id(beanId) //
                    .type(KafkaAsyncRepository.class) //
                    .property("properties", kafkaProperties).registerTo(applicationContext);
        }

        return beanId;
    }

    private String activemqAsyncRepository(DataNode dataNode) {
        String beanId = "ActiveMQAsyncRepository-" + dataNode.getUrl();

        if (!applicationContext.containsBean(beanId)) {
            String username = dataNode.getUsername();
            String password = dataNode.getPassword();
            String brokerUrl = "tcp://" + dataNode.getUrl().substring(17);

            new BeanBuilder() //
                    .id(beanId) //
                    .type(ActiveMQAsyncRepository.class) //
                    .property("connectionFactory", new ActiveMQConnectionFactory(username, password, brokerUrl)) //
                    .registerTo(applicationContext) //
                    .get();
        }

        return beanId;
    }

    /**
     * 收消息
     */
    public void asyncDataProducer(DataNode dataNode, Pipeline pipeline) {
        String beanId = "asyncDataProducer-" + dataNode.getUrl();
        String asyncRepositoryBeanName = this.asyncRepository(dataNode);

        if (!applicationContext.containsBean(beanId)) {
            String destinationName = PipelineUtil.destinationName(pipeline);

            BeanBuilder beanBuilder = new BeanBuilder() //
                    .id(beanId) //
                    .type(BudoAsyncServiceBean.class) //
                    .property("interfaceType", DataConsumer.class) //
                    .property("asyncRepositoryBeanName", asyncRepositoryBeanName) //
                    .property("destinationName", destinationName) //
                    .ref("instance", "dispatcherDataConsumer"); // 收到消息后由dispatcherDataConsumer处理

            beanBuilder.registerTo(applicationContext) //
                    .get();

            log.info("#149 asyncRepositoryBeanName=" + asyncRepositoryBeanName + ", beanBuilder=" + beanBuilder);
        }
    }

    /**
     * 发消息
     */
    private DataConsumer asyncDataConsumer(DataNode dataNode, Pipeline pipeline) {
        String beanId = "asyncDataConsumer-" + dataNode.getUrl() + "-" + pipeline.getSourceTable() + "-" + pipeline.getTargetTable();
        String asyncRepositoryBeanName = this.asyncRepository(dataNode);

        if (!applicationContext.containsBean(beanId)) {
            String destinationName = PipelineUtil.destinationName(pipeline);

            new BeanBuilder() //
                    .id(beanId) //
                    .type(BudoAsyncReferenceBean.class) //
                    .property("interfaceType", DataConsumer.class) //
                    .property("asyncRepositoryBeanName", asyncRepositoryBeanName) //
                    .property("destinationName", destinationName) //
                    .registerTo(applicationContext);
        }

        return (DataConsumer) applicationContext.getBean(beanId);
    }
}